[CDK]Step Functions StateMachineの実行履歴を自動でAthenaに連携する

[CDK]Step Functions StateMachineの実行履歴を自動でAthenaに連携する

Clock Icon2024.08.27

はじめに

コンサルティング部の神野です。
Step Functions StateMachineの実行結果を一覧でまとめたり分析したいと思ったりしたことはありませんか?
CloudWatchや該当StateMachineのコンソール画面でもある程度は確認することができますが、
全StateMachineの実行履歴を1テーブルに集約して蓄積及びBIツールなどで可視化して実行結果を分析したいといったケースもあるかと思います。

その可視化する前段として、Lambdaを活用してStateMachineの実行結果履歴をAthena(S3)に連携していきたいと思います!

前提

24時間以内に実行されたStateMachineの実行履歴を連携する簡易的なシステムの構築を進めていきます。

  1. EventBridge Schedulerで深夜0時に定期バッチに見立てたStateMachineを実行
    Untitled(1)
  2. 深夜3時にStateMachineの実行履歴をAthenaに連携するLambda関数を実行
    Untitled
    • 深夜3時に24時間以内に実行されたStateMachineの実行履歴を取得するLambda関数をEventBridge Schedulerで実行
    • 取得した実行履歴をAthenaに連携
      ※データの実態はS3に格納される

構築

今回はCDK(TypeScript)を使って構築していきます。
TypeScriptで記載したLambda関数をトランスパイルせずそのままデプロイできるので採用しました。(CDK側でトランスパイル実行)

前提

CDK(TypeScript)を使用するため事前にNode.js及びCDKのインストールが必要です。もしインストールしていなければそれぞれインストールしておきます。
下記コマンドを実行してプロジェクトを作成します。

実行コマンド
cdk init --language typescript

バージョン一覧

  • Node.js 20.16.0
  • aws-cdk 2.151.0
  • @aws-cdk/aws-glue-alpha 2.151.0-alpha.0
  • @aws-cdk/aws-scheduler-alpha 2.151.0-alpha.0
  • @aws-cdk/aws-scheduler-targets-alpha 2.151.0-alpha.0

ライブラリインストール

Glue DatabaseEventBridge SchedulerなどのL2 Constructを使うため一部アルファバージョンをインストールしています。

実行コマンド
npm install @aws-cdk/aws-glue-alpha @aws-cdk/aws-scheduler-alpha @aws-cdk/aws-scheduler-targets-alpha

全体

今回は1つのStackcdk_state_machines_metrics-stack.tsに全てコードを記載していきます。
コードは詳細に説明していきます。

cdk_state_machines_metrics-stack.ts
// 必要なモジュールをインポート
import * as cdk from "aws-cdk-lib";
import { aws_lambda_nodejs, TimeZone } from "aws-cdk-lib";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as s3 from "aws-cdk-lib/aws-s3";
import * as athena from "aws-cdk-lib/aws-athena";
import * as iam from "aws-cdk-lib/aws-iam";
import * as glue from "@aws-cdk/aws-glue-alpha";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as path from "path";
import { Construct } from "constructs";
import * as aws_scheduler_alpha from "@aws-cdk/aws-scheduler-alpha";
import * as aws_scheduler_targets_alpha from "@aws-cdk/aws-scheduler-targets-alpha";

// CDKスタッククラスを定義
export class CdkStepfunctionsMetricStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // メトリクスとAthenaクエリ結果を保存するS3バケットを作成
    const bucket = new s3.Bucket(this, "StateMachineMetricsBucket", {
      bucketName: "<your-bucket-name>",
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
    });

    // Athenaワークグループを作成
    const workgroup = new athena.CfnWorkGroup(
      this,
      "StateMachinesMetricsWorkgroup",
      {
        name: "StateMachinesMetricsWorkgroup",
        recursiveDeleteOption: true,
        workGroupConfiguration: {
          resultConfiguration: {
            outputLocation: `s3://${bucket.bucketName}/athena-results/`,
          },
          publishCloudWatchMetricsEnabled: true,
          enforceWorkGroupConfiguration: true,
          bytesScannedCutoffPerQuery: 1073741824,
        },
      }
    );

    // Glueデータベースを作成
    const database = new glue.Database(this, "StateMachinesMetricsDatabase", {
      databaseName: "statemachines_metrics_db",
    });

    // Glueテーブルを作成(Athenaでクエリ可能)
    const table = new glue.S3Table(this, "StateMachinesMetricsTable", {
      database: database,
      tableName: "statemachines_metrics",
      columns: [
        { name: "name", type: glue.Schema.STRING },
        { name: "date", type: glue.Schema.STRING },
        { name: "starttime", type: glue.Schema.STRING },
        { name: "endtime", type: glue.Schema.STRING },
        { name: "duration", type: glue.Schema.DOUBLE },
        { name: "status", type: glue.Schema.STRING },
      ],
      dataFormat: glue.DataFormat.JSON,
      bucket: bucket,
      s3Prefix: "statemachines_metrics/",
    });

    // Lambda関数用のIAMロールを作成
    const lambdaRole = new iam.Role(this, "StateMachinesMetricsLambdaRole", {
      assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
    });

    // Lambda実行に必要な基本的な権限を追加
    lambdaRole.addManagedPolicy(
      iam.ManagedPolicy.fromAwsManagedPolicyName(
        "service-role/AWSLambdaBasicExecutionRole"
      )
    );

    // 必要なAWSサービスへのアクセス権限を追加
    lambdaRole.addToPolicy(
      new iam.PolicyStatement({
        actions: [
          "athena:StartQueryExecution",
          "states:GetExecutionHistory",
          "states:DescribeExecution",
          "states:ListStateMachines",
          "states:ListExecutions",
        ],
        resources: ["*"],
      })
    );

    // Glue Data Catalogへのアクセス権限を追加
    lambdaRole.addToPolicy(
      new iam.PolicyStatement({
        actions: ["glue:GetTable", "glue:GetPartitions"],
        resources: [
          "arn:aws:glue:*:*:catalog",
          `arn:aws:glue:*:*:database/${database.databaseName}`,
          `arn:aws:glue:*:*:table/${database.databaseName}/${table.tableName}`,
        ],
      })
    );

    // Lambda関数を作成
    const lambdaFunction = new aws_lambda_nodejs.NodejsFunction(
      this,
      "StateMachinesMetricsLambda",
      {
        runtime: lambda.Runtime.NODEJS_20_X,
        handler: "handler",
        entry: path.join(__dirname, "../lambda/index.ts"),
        role: lambdaRole,
        timeout: cdk.Duration.minutes(5),
        environment: {
          BUCKET_NAME: bucket.bucketName,
          ATHENA_WORKGROUP: workgroup.name,
          DATABASE_NAME: database.databaseName,
          TABLE_NAME: table.tableName,
        },
      }
    );

    // Lambda関数にS3バケットへの読み書き権限を付与
    bucket.grantReadWrite(lambdaFunction);

    // Lambda関数を実行するためのターゲットを作成
    const target = new aws_scheduler_targets_alpha.LambdaInvoke(
      lambdaFunction,
      {}
    );

    // Lambda関数を定期的に実行するスケジュールを作成
    new aws_scheduler_alpha.Schedule(this, "Schedule", {
      scheduleName: "invoke-lambda-schedule",
      schedule: aws_scheduler_alpha.ScheduleExpression.cron({
        minute: "0",
        hour: "3",
        day: "*",
        month: "*",
        year: "*",
        timeZone: TimeZone.ASIA_TOKYO,
      }),
      target,
    });

    // 5つのStateMachineを作成
    this.createMultipleStateMachines(5);
  }

  // 複数のStateMachineを作成するメソッド
  private createMultipleStateMachines(count: number) {
    for (let i = 1; i <= count; i++) {
      // ランダムで5-300秒の待機時間を生成
      const waitTime = this.generateRandomWaitTime(5, 300);

      // StateMachineの定義を作成
      const definition = sfn.Chain.start(
        new sfn.Wait(this, `Wait${i}`, {
          time: sfn.WaitTime.duration(cdk.Duration.seconds(waitTime)),
        })
      ).next(new sfn.Succeed(this, `Success${i}`));

      // StateMachineを作成
      const stateMachine = new sfn.StateMachine(this, `StateMachine${i}`, {
        definition,
        stateMachineName: `TestStateMachine${i}`,
        timeout: cdk.Duration.minutes(5),
      });

      // 各StateMachine用のスケジューラーを作成
      this.createSchedulerForStateMachine(stateMachine, i);
    }
  }

  // 指定された範囲でランダムな秒数を生成するメソッド
  private generateRandomWaitTime(min: number, max: number): number {
    return Math.floor(Math.random() * (max - min + 1) + min);
  }

  // StateMachineを実行するEventBridge Schedulerを設定するメソッド
  private createSchedulerForStateMachine(
    stateMachine: sfn.StateMachine,
    index: number
  ) {
    const target = new aws_scheduler_targets_alpha.StepFunctionsStartExecution(
      stateMachine,
      {}
    );

    new aws_scheduler_alpha.Schedule(this, `Schedule-StateMachine${index}`, {
      scheduleName: `invoke-StateMachine${index}-schedule`,
      schedule: aws_scheduler_alpha.ScheduleExpression.cron({
        minute: "0",
        hour: "0",
        day: "*",
        month: "*",
        year: "*",
        timeZone: TimeZone.ASIA_TOKYO,
      }),
      target,
    });
  }
}

S3

データ格納およびAthenaの実行結果格納用のバケットを作成します。

cdk_state_machines_metrics-stack.ts
// メトリクスとAthenaクエリ結果を保存するS3バケットを作成
const bucket = new s3.Bucket(this, "StateMachineMetricsBucket", {
  bucketName: "<your-bucket-name>",
  removalPolicy: cdk.RemovalPolicy.DESTROY,
  autoDeleteObjects: true,
});

Athena

下記リソースを作成していきます。

  • WorkGroup
    • StateMachinesMetricsWorkgroup
      • クエリ実行結果をs3://${bucket.bucketName}/athena-results/に格納するよう設定
  • Database
    • statemachines_metrics_db
  • S3Table
    • statemachines_metrics
    • 格納するデータはS3のPrefixをstatemacines_metrics/に設定
cdk_state_machines_metrics-stack.ts
// Athenaワークグループを作成
const workgroup = new athena.CfnWorkGroup(
  this,
  "StateMachinesMetricsWorkgroup",
  {
    name: "StateMachinesMetricsWorkgroup",
    recursiveDeleteOption: true,
    workGroupConfiguration: {
      resultConfiguration: {
        outputLocation: `s3://${bucket.bucketName}/athena-results/`,
      },
      publishCloudWatchMetricsEnabled: true,
      enforceWorkGroupConfiguration: true,
      bytesScannedCutoffPerQuery: 1073741824,
    },
  }
);

// Glueデータベースを作成
const database = new glue.Database(this, "StateMachinesMetricsDatabase", {
  databaseName: "statemachines_metrics_db",
});

// Glueテーブルを作成(Athenaでクエリ可能)
const table = new glue.S3Table(this, "StateMachinesMetricsTable", {
  database: database,
  tableName: "statemachines_metrics",
  columns: [
    { name: "name", type: glue.Schema.STRING },
    { name: "date", type: glue.Schema.STRING },
    { name: "starttime", type: glue.Schema.STRING },
    { name: "endtime", type: glue.Schema.STRING },
    { name: "duration", type: glue.Schema.DOUBLE },
    { name: "status", type: glue.Schema.STRING },
  ],
  dataFormat: glue.DataFormat.JSON,
  bucket: bucket,
  s3Prefix: "statemachines_metrics/",
});

作成するテーブルはStateMachineの実行履歴を格納するテーブルで各カラムの役割は下記の通りです。

テーブルのカラム一覧

カラム名 説明 データ型
name 実行したStateMachine名 String
date 実行日 String
starttime 実行の開始時間 String
endtime 実行の終了時間 String
duration 実行時間(秒) Double
status ステータス String

Lambda + EventBridge Scheduler

下記リソースを作成します。

  • Lambdaで使用するIAMロール
    下記権限を付与
    • StateMachineの実行履歴を取得できる権限
    • Glue Data Catalogへのアクセス権限
    • Athenaのクエリ実行権限
    • データ格納用バケットへの参照・書き込み権限
  • Lambda
    • StateMachineから結果を取得し、Athena(S3)にデータを連携する処理
    • lambdaフォルダに格納したソースコードを指定
    • 環境変数の設定
      • S3バケット名
      • 使用するAthenaのWorkGroup名
      • 使用するデータベース名
      • 使用するテーブル名
  • EventBridge Scheduler
    • 毎日深夜3時にLambda関数を実行するよう設定
cdk_state_machines_metrics-stack.ts
// Lambda関数用のIAMロールを作成
const lambdaRole = new iam.Role(this, "StateMachinesMetricsLambdaRole", {
  assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
});

// Lambda実行に必要な基本的な権限を追加
lambdaRole.addManagedPolicy(
  iam.ManagedPolicy.fromAwsManagedPolicyName(
    "service-role/AWSLambdaBasicExecutionRole"
  )
);

// 必要なAWSサービスへのアクセス権限を追加
lambdaRole.addToPolicy(
  new iam.PolicyStatement({
    actions: [
      "athena:StartQueryExecution",
      "states:GetExecutionHistory",
      "states:DescribeExecution",
      "states:ListStateMachines",
      "states:ListExecutions",
    ],
    resources: ["*"],
  })
);

// Glue Data Catalogへのアクセス権限を追加
lambdaRole.addToPolicy(
  new iam.PolicyStatement({
    actions: ["glue:GetTable", "glue:GetPartitions"],
    resources: [
      "arn:aws:glue:*:*:catalog",
      `arn:aws:glue:*:*:database/${database.databaseName}`,
      `arn:aws:glue:*:*:table/${database.databaseName}/${table.tableName}`,
    ],
  })
);

// Lambda関数を作成
const lambdaFunction = new aws_lambda_nodejs.NodejsFunction(
  this,
  "StateMachinesMetricsLambda",
  {
    runtime: lambda.Runtime.NODEJS_20_X,
    handler: "handler",
    entry: path.join(__dirname, "../lambda/index.ts"),
    role: lambdaRole,
    timeout: cdk.Duration.minutes(5),
    environment: {
      BUCKET_NAME: bucket.bucketName,
      ATHENA_WORKGROUP: workgroup.name,
      DATABASE_NAME: database.databaseName,
      TABLE_NAME: table.tableName,
    },
  }
);

// Lambda関数にS3バケットへの読み書き権限を付与
bucket.grantReadWrite(lambdaFunction);

// Lambda関数を実行するためのターゲットを作成
const target = new aws_scheduler_targets_alpha.LambdaInvoke(
  lambdaFunction,
  {}
);

// Lambda関数を定期的に実行するスケジュールを作成
new aws_scheduler_alpha.Schedule(this, "Schedule", {
  scheduleName: "invoke-lambda-schedule",
  schedule: aws_scheduler_alpha.ScheduleExpression.cron({
    minute: "0",
    hour: "3",
    day: "*",
    month: "*",
    year: "*",
    timeZone: TimeZone.ASIA_TOKYO,
  }),
  target,
});

StateMachine + EventBridge Scheduler

下記リソースを作成します。

  • StateMachine
    • StateMachineは大量に作成できるよう別関数に切り出して、引数で指定した数分StateMachineを作成
    • 指定の秒数を待って完了するといったシンプルな処理を作成
  • EventBridge Scheduler
    • 定期実行で深夜0時に作成したStateMachineを実行するよう設定
cdk_state_machines_metrics-stack.ts
export class CdkStateMachinesMetricsStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // 省略

    // 5つのStateMachineを作成
    this.createMultipleStateMachines(5);
  }

  private createMultipleStateMachines(count: number) {
    for (let i = 1; i <= count; i++) {
      // ランダムで5-300秒を取得する
      const waitTime = this.generateRandomWaitTime(5, 300);

      const definition = sfn.Chain.start(
        new sfn.Wait(this, `Wait${i}`, {
          time: sfn.WaitTime.duration(cdk.Duration.seconds(waitTime)),
        })
      ).next(new sfn.Succeed(this, `Success${i}`));

      const stateMachine = new sfn.StateMachine(this, `StateMachine${i}`, {
        definition,
        stateMachineName: `TestStateMachine${i}`,
        timeout: cdk.Duration.minutes(5),
      });

      // Create a scheduler for each StateMachine
      this.createSchedulerForStateMachine(stateMachine, i);
    }
  }

  // 指定された範囲でランダムな秒数を取得する関数
  private generateRandomWaitTime(min: number, max: number): number {
    return Math.floor(Math.random() * (max - min + 1) + min);
  }

  // StateMachineを実行するEventBridge Schedulerを設定する関数
  private createSchedulerForStateMachine(
    stateMachine: sfn.StateMachine,
    index: number
  ) {
    const target = new aws_scheduler_targets_alpha.StepFunctionsStartExecution(
      stateMachine,
      {}
    );

    new aws_scheduler_alpha.Schedule(this, `Schedule-StateMachine${index}`, {
      scheduleName: `invoke-StateMachine${index}-schedule`,
      schedule: aws_scheduler_alpha.ScheduleExpression.cron({
        minute: "0",
        hour: "0",
        day: "*",
        month: "*",
        year: "*",
        timeZone: TimeZone.ASIA_TOKYO,
      }),
      target,
    });
  }
}

Lambdaの処理

プロジェクトフォルダー直下にlambdaフォルダを作成し、フォルダ内にindex.tsを作成して処理を記載します。

準備

  1. npm initコマンド実行でプロジェクトの作成

    実行コマンド
    npm init
    
    # 諸々と聞かれるが、特段何も変更する箇所がなければ全てyes
    
  2. 日付の時間操作を行うので下記ライブラリをインストールしておきます。

    • date-fns
    実行コマンド
    npm install date-fns
    

処理詳細

作成するLambda関数は以下の手順で、StateMachineの実行履歴を取得し、Athenaのテーブルにデータを挿入します。

  1. StateMachine一覧の取得と実行履歴の収集
    • AWS SDKのStepFunctionsクライアントを使用してlistStateMachines APIを呼び出し、環境内の全StateMachineの一覧を取得
    • 各StateMachineに対してlistExecutions APIを呼び出し、過去24時間以内の実行結果を取得
      ※毎日連携するため、24時間以内で実行されたものに限定
    • RUNNING状態の実行結果は除外し、完了した実行のみを対象
    • ページネーションを使用して、100件ずつ実行履歴を取得
  2. データの整形とAthenaクエリの生成
    • 取得した実行結果を、Athenaテーブルのスキーマ(name, date, starttime, endtime, duration, status)に合わせて整形
    • date-fnsライブラリを使用して、日付と時間をフォーマットし、UTCからJSTに変換
    • 整形されたデータを使用して、環境変数から取得したデータベース名とテーブル名を用いてINSERTするSQLクエリを生成
  3. Athenaクエリの実行とエラーハンドリング
    • AWS SDKのAthenaClientを使用して、生成したクエリをstartQueryExecution APIで非同期に実行
    • クエリ実行時に、環境変数から取得したワークグループ名とS3出力先を指定
    • try-catchブロックを使用してエラーハンドリングを行い、エラーが発生した場合はコンソールにログを出力
  4. 実行結果のログ出力
    • 処理完了時に、成功メッセージをレスポンスとして返却
    • エラーが発生した場合は、500エラーとエラーメッセージを返却
lambda/index.ts
import { StepFunctions, Athena } from "aws-sdk";
import { APIGatewayProxyResult } from "aws-lambda";
import { format, addHours } from "date-fns";

// AWS SDKのクライアントを初期化
const sfn = new StepFunctions();
const athena = new Athena();

// 環境変数から必要な値を取得
const BUCKET_NAME = process.env.BUCKET_NAME;
const ATHENA_WORKGROUP = process.env.ATHENA_WORKGROUP;
const DATABASE_NAME = process.env.DATABASE_NAME;
const TABLE_NAME = process.env.TABLE_NAME;

// StateMachine実行結果の型定義
interface ExecutionResult {
  StateMachineName: string;
  StartTime: string;
  Date: string;
  EndTime: string;
  Duration: number;
  Status: string;
}

// Lambda関数のメインハンドラー
export const handler = async (event: any): Promise<APIGatewayProxyResult> => {
  try {
    // 全StateMachineの実行結果を取得
    const results: ExecutionResult[] = await getAllExecutions();

    // 実行結果がない場合は処理をスキップ
    if (results.length === 0)
      return {
        statusCode: 200,
        body: JSON.stringify("Skip no data"),
      };

    // Athenaクエリを生成
    const query = buildAthenaQuery(results);
    // Athenaクエリを実行
    await executeAthenaQuery(query);

    // 成功レスポンスを返す
    return {
      statusCode: 200,
      body: JSON.stringify(
        "Daily State Machines execution summary inserted into Athena table"
      ),
    };
  } catch (error) {
    // エラーハンドリング
    console.error("Error:", error);
    return {
      statusCode: 500,
      body: JSON.stringify("An error occurred while processing the request"),
    };
  }
};

// 全StateMachineの実行結果を取得する関数
async function getAllExecutions(): Promise<ExecutionResult[]> {
  const stateMachines = await getStateMachines();
  let allResults: ExecutionResult[] = [];

  for (const stateMachineArn of stateMachines) {
    const executions = await listExecutions(stateMachineArn);
    allResults = allResults.concat(executions);
  }

  return allResults;
}

// 全StateMachineのARNを取得する関数
async function getStateMachines(): Promise<string[]> {
  const response = await sfn.listStateMachines().promise();
  return (
    response.stateMachines?.map(
      (sm: StepFunctions.StateMachineListItem) => sm.stateMachineArn ?? ""
    ) || []
  );
}

// UTCからJSTに変換する関数
function toJST(date: Date): Date {
  return addHours(date, 9);
}

// 日付をフォーマットする関数
function formatDate(date: Date): string {
  return format(toJST(date), "yyyy-MM-dd HH:mm:ss");
}

// 特定のStateMachineの実行結果を取得する関数
async function listExecutions(
  stateMachineArn: string
): Promise<ExecutionResult[]> {
  const results: ExecutionResult[] = [];
  const twentyFourHoursAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
  let nextToken: string | undefined;

  do {
    // StateMachineの実行履歴を取得
    const response = await sfn
      .listExecutions({
        stateMachineArn,
        maxResults: 100,
        nextToken,
      })
      .promise();

    if (response.executions) {
      for (const execution of response.executions) {
        // 24時間以内の完了した実行のみを対象とする
        if (
          execution.startDate >= twentyFourHoursAgo &&
          execution.status !== "RUNNING"
        ) {
          const endTime = execution.stopDate ? execution.stopDate : new Date();
          results.push({
            StateMachineName: stateMachineArn.split(":").pop() || "",
            StartTime: formatDate(execution.startDate),
            Date: format(toJST(execution.startDate), "yyyy-MM-dd"),
            Duration: (endTime.getTime() - execution.startDate.getTime()) / 1000,
            EndTime: formatDate(endTime),
            Status: execution.status,
          });
        } else if (execution.startDate < twentyFourHoursAgo) {
          // 24時間以上前の実行は対象外なのでループを終了
          nextToken = undefined;
          break;
        }
      }
    }

    nextToken = response.nextToken;
  } while (nextToken);

  return results;
}

// Athenaクエリを生成する関数
function buildAthenaQuery(results: ExecutionResult[]): string {
  const tableName = TABLE_NAME;
  const databaseName = DATABASE_NAME;

  // INSERTクエリの基本構造を作成
  let query = `
    INSERT INTO ${databaseName}.${tableName}
    SELECT 
      name,
      date,
      starttime,
      endtime,
      duration,
      status
    FROM (
  `;

  // 各実行結果をUNION ALLで結合
  for (let i = 0; i < results.length; i++) {
    const result = results[i];
    query += `
      SELECT
        '${result.StateMachineName}' as name,
        '${result.Date}' as date,
        '${result.StartTime}' as starttime,
        '${result.EndTime}' as endtime,
        ${result.Duration} as duration,
        '${result.Status}' as status
    `;

    if (i < results.length - 1) {
      query += "UNION ALL";
    }
  }

  query += ")";

  console.log(query);

  return query;
}

// Athenaクエリを実行する関数
async function executeAthenaQuery(query: string): Promise<void> {
  const params: Athena.StartQueryExecutionInput = {
    QueryString: query,
    QueryExecutionContext: {
      Database: DATABASE_NAME,
    },
    ResultConfiguration: {
      OutputLocation: `s3://${BUCKET_NAME}/athena-results/`,
    },
    WorkGroup: ATHENA_WORKGROUP,
  };

  // クエリを非同期で実行
  await athena.startQueryExecution(params).promise();
}

この処理により、環境内の全StateMachineの直近24時間の実行履歴が自動的にAthenaテーブルに格納され、後続の分析や可視化に利用できるようになります。

環境作成

deployコマンドを実行して、環境を作成します。

cdk deploy

動作確認

StateMachineはEventBridge Schedulerで0時に起動、Lambda関数は3時に起動してデータが連携されているので、Athenaで下記クエリを実行して連携されたデータを確認してみます。下記を使用してクエリを実行します。

  • CDKで作成したワークグループStateMachinesMetricsWorkgroupを選択
  • CDKで作成したデータベースstatemachines_metrics_dbを選択
  • CDKで作成したテーブルstatemachines_metricsへSQL実行
実行SQL
select *
from statemachines_metrics

実行結果

スクリーンショット 2024-08-27 9.51.33
実行結果

クエリを実行すると問題なく連携されていますね!

おわりに

StateMachineの実行履歴をAthenaに連携する方法はいかがだったでしょうか?
データの蓄積やBIツールでの分析などを可能にするための一歩として参考になりましたら幸いです。

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.